Residual Networks (ResNets) for Image Data (from scratch NumPy + PyTorch)#
ResNets (Residual Networks) are a CNN architecture built around one core idea:
learn residual updates and add them back with a skip connection: (y = x + F(x))
Why this matters:
deeper networks become much easier to optimize (better gradient flow)
blocks can fall back to the identity map when extra depth isn’t needed
This notebook includes a quick CNN recap. For a more CNN-first introduction, see:
../cnn/00_overview.ipynb
Learning goals#
By the end you should be able to:
explain the ResNet block and why skip connections help
implement and train a tiny ResNet-like model in pure NumPy (manual backprop)
implement the same idea in PyTorch and visualize training and predictions
Notebook roadmap#
Data: load a small image dataset (no downloads)
Intuition: convolution, filters, and parameter counts
ResNet intuition: why skip connections help
From scratch (NumPy): Conv2D + ReLU + residual block + training loop
Visual diagnostics (NumPy): filters, feature maps, confusion matrix
Practical (PyTorch): same model idea with autograd
Pitfalls + exercises
import time
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import os
import plotly.io as pio
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import train_test_split
try:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
TORCH_AVAILABLE = True
except Exception as e:
TORCH_AVAILABLE = False
_TORCH_IMPORT_ERROR = e
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 42
rng = np.random.default_rng(SEED)
# --- Run configuration ---
FAST_RUN = True # set False for better accuracy / more epochs
EPOCHS_NUMPY = 10 if FAST_RUN else 40
EPOCHS_TORCH = 10 if FAST_RUN else 30
BATCH_SIZE = 64
LR_NUMPY = 0.05
MOMENTUM = 0.9
WEIGHT_DECAY = 1e-4
LR_TORCH = 1e-3
RUN_GRAD_CHECK = False # optional (slow)
1) Data: a tiny image dataset (8×8 handwritten digits)#
To keep this notebook offline-friendly, we use sklearn.datasets.load_digits():
1 channel (grayscale)
8×8 images
10 classes (0–9)
Even though this is much smaller than ImageNet, it’s perfect for understanding the mechanics of CNNs/ResNets.
digits = load_digits()
X = digits.images.astype(np.float32) # (N, 8, 8)
y = digits.target.astype(np.int64)
# Normalize pixels roughly into [0, 1]
X = X / 16.0
# Add channel dimension: (N, C=1, H, W)
X = X[:, None, :, :]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.25, random_state=SEED, stratify=y
)
print('X_train', X_train.shape, 'X_test', X_test.shape)
X_train (1347, 1, 8, 8) X_test (450, 1, 8, 8)
# Visualize a grid of sample images
idx = rng.choice(len(X_train), size=36, replace=False)
imgs = X_train[idx, 0] # (36, 8, 8)
labels = y_train[idx]
fig = px.imshow(
imgs,
facet_col=0,
facet_col_wrap=9,
color_continuous_scale='gray',
title='Sample training images (8×8)',
)
# Remove axis ticks for small multiples
for a in fig.layout.annotations:
a.text = ''
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)
fig.update_layout(coloraxis_showscale=False, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
2) Convolution intuition (filters as pattern detectors)#
In deep learning, “convolution” is usually implemented as cross-correlation (we don’t flip the kernel):
[ y[n, c_{out}, i, j] = b[c_{out}] + \sum_{c_{in}} \sum_{u=0}^{kH-1} \sum_{v=0}^{kW-1} x[n, c_{in}, i+u, j+v], W[c_{out}, c_{in}, u, v] ]
Why this is powerful:
Local receptive fields: each output looks at a small patch.
Weight sharing: the same kernel weights are reused at every location → far fewer parameters.
Key knobs:
Kernel size (e.g. 3×3): how much local context a filter sees at once
Padding: whether we keep spatial size (“same”) or shrink (“valid”)
Stride: step size when sliding the kernel
Channels: multiple input and output feature maps
# Parameter count: dense vs conv
#
# Suppose we map an 8×8 image (64 pixels) to 8 feature maps (still 8×8).
# - A dense layer would connect every input pixel to every output pixel.
# - A conv layer uses a small kernel shared across space.
def n_params_dense(fan_in: int, fan_out: int) -> int:
return fan_in * fan_out + fan_out
def n_params_conv2d(c_in: int, c_out: int, k: int) -> int:
return c_out * (c_in * k * k) + c_out
dense = n_params_dense(64, 64 * 8)
conv = n_params_conv2d(c_in=1, c_out=8, k=3)
_df = pd.DataFrame(
[
{'layer': 'Dense (64 → 512)', 'params': dense},
{'layer': 'Conv2D (1→8, 3×3)', 'params': conv},
]
)
fig = px.bar(
_df,
x='layer',
y='params',
text='params',
title='Parameter count: dense vs convolution (weight sharing)',
)
fig.update_traces(textposition='outside')
fig.update_layout(yaxis_title='number of parameters')
fig.show()
from plotly.subplots import make_subplots
def conv2d_single_channel_naive(img: np.ndarray, kernel: np.ndarray, padding: int = 0, stride: int = 1) -> np.ndarray:
img = np.asarray(img, dtype=np.float32)
kernel = np.asarray(kernel, dtype=np.float32)
H, W = img.shape
kH, kW = kernel.shape
if padding > 0:
img_p = np.pad(img, ((padding, padding), (padding, padding)), mode='constant')
else:
img_p = img
Hp, Wp = img_p.shape
out_h = (Hp - kH) // stride + 1
out_w = (Wp - kW) // stride + 1
out = np.zeros((out_h, out_w), dtype=np.float32)
for i in range(out_h):
for j in range(out_w):
patch = img_p[i*stride:i*stride+kH, j*stride:j*stride+kW]
out[i, j] = float(np.sum(patch * kernel))
return out
# Pick one image
img = X_train[0, 0]
# Two classic "edge" kernels
kx = np.array([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=np.float32)
ky = np.array([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=np.float32)
fx = conv2d_single_channel_naive(img, kx, padding=1)
fy = conv2d_single_channel_naive(img, ky, padding=1)
fig = make_subplots(rows=1, cols=3, subplot_titles=['Input', 'Horizontal edges', 'Vertical edges'])
fig.add_trace(go.Heatmap(z=img, colorscale='gray', showscale=False), row=1, col=1)
fig.add_trace(go.Heatmap(z=fx, colorscale='RdBu', zmid=0, showscale=False), row=1, col=2)
fig.add_trace(go.Heatmap(z=fy, colorscale='RdBu', zmid=0, showscale=False), row=1, col=3)
fig.update_layout(title='A toy convolution demo (naive implementation)', height=320, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
3) ResNet intuition: why skip connections help#
A common surprise with deep networks is the degradation problem:
as you add more layers, the training error can get worse (even if overfitting isn’t the issue).
A residual block changes the learning target. Instead of learning a full mapping (H(x)), it learns a residual (F(x)):
[ y = x + F(x; heta) ]
Why this helps:
Easy identity: if the extra layers aren’t useful, set (F(x)=0) and the block behaves like the identity.
Better gradient flow: the skip path gives a direct route for gradients.
A quick gradient sketch (informal):
[ rac{\partial \mathcal{L}}{\partial x} = rac{\partial \mathcal{L}}{\partial y},\Big(I + rac{\partial F}{\partial x}\Big) ]
The identity term (I) means gradients don’t have to pass only through (F), which makes optimization easier in practice.
4) From scratch in NumPy: a tiny residual CNN#
In this section we implement a small CNN/ResNet-like model without autograd:
Conv2Dforward + backward (viaim2col/col2im)ReLU,MaxPool2D,Flatten,Lineara
ResidualBlock: (y = \mathrm{ReLU}(x + F(x)))softmax cross-entropy loss
SGD with momentum
This is not meant to be the fastest implementation — it’s meant to be readable and faithful to the math.
from dataclasses import dataclass
def _pad2d(x: np.ndarray, pad: int) -> np.ndarray:
if pad <= 0:
return x
return np.pad(x, ((0, 0), (0, 0), (pad, pad), (pad, pad)), mode='constant')
def im2col(x: np.ndarray, kH: int, kW: int, stride: int, pad: int) -> tuple[np.ndarray, tuple[int,int,int,int]]:
"""
x: (N, C, H, W)
Returns:
cols: (N*out_h*out_w, C*kH*kW)
out_shape: (out_h, out_w, H_padded, W_padded) for col2im
"""
N, C, H, W = x.shape
x_p = _pad2d(x, pad)
_, _, Hp, Wp = x_p.shape
out_h = (Hp - kH) // stride + 1
out_w = (Wp - kW) // stride + 1
cols = np.empty((N, C, kH, kW, out_h, out_w), dtype=x.dtype)
for i in range(kH):
i_end = i + stride * out_h
for j in range(kW):
j_end = j + stride * out_w
cols[:, :, i, j, :, :] = x_p[:, :, i:i_end:stride, j:j_end:stride]
cols = cols.transpose(0, 4, 5, 1, 2, 3).reshape(N * out_h * out_w, C * kH * kW)
return cols, (out_h, out_w, Hp, Wp)
def col2im(cols: np.ndarray, x_shape: tuple[int,int,int,int], kH: int, kW: int, stride: int, pad: int, out_shape: tuple[int,int,int,int]) -> np.ndarray:
N, C, H, W = x_shape
out_h, out_w, Hp, Wp = out_shape
cols_reshaped = cols.reshape(N, out_h, out_w, C, kH, kW).transpose(0, 3, 4, 5, 1, 2)
x_p = np.zeros((N, C, Hp, Wp), dtype=cols.dtype)
for i in range(kH):
i_end = i + stride * out_h
for j in range(kW):
j_end = j + stride * out_w
x_p[:, :, i:i_end:stride, j:j_end:stride] += cols_reshaped[:, :, i, j, :, :]
if pad <= 0:
return x_p
return x_p[:, :, pad:-pad, pad:-pad]
class Conv2D:
def __init__(self, c_in: int, c_out: int, k: int, stride: int = 1, pad: int = 0, rng: np.random.Generator | None = None):
self.c_in = c_in
self.c_out = c_out
self.k = k
self.stride = stride
self.pad = pad
rng = rng or np.random.default_rng(0)
scale = np.sqrt(2.0 / (c_in * k * k)) # He init for ReLU
self.W = (rng.standard_normal((c_out, c_in, k, k)).astype(np.float32) * scale)
self.b = np.zeros((c_out,), dtype=np.float32)
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
# x: (N, C_in, H, W)
cols, out_shape = im2col(x, self.k, self.k, self.stride, self.pad)
W_col = self.W.reshape(self.c_out, -1)
out = cols @ W_col.T + self.b[None, :]
N, _, H, W = x.shape
out_h, out_w, _, _ = out_shape
out = out.reshape(N, out_h, out_w, self.c_out).transpose(0, 3, 1, 2)
self._cache = (x, cols, out_shape)
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
x, cols, out_shape = self._cache
N, C_in, H, W = x.shape
# dout: (N, C_out, out_h, out_w)
dout_col = dout.transpose(0, 2, 3, 1).reshape(-1, self.c_out)
self.db[...] = dout_col.sum(axis=0)
W_col = self.W.reshape(self.c_out, -1)
self.dW[...] = (dout_col.T @ cols).reshape(self.W.shape)
dcols = dout_col @ W_col
dx = col2im(dcols, x.shape, self.k, self.k, self.stride, self.pad, out_shape)
return dx
def step(self, lr: float, vW: np.ndarray, vb: np.ndarray, weight_decay: float) -> tuple[np.ndarray, np.ndarray]:
# SGD + momentum with weight decay
vW = MOMENTUM * vW - lr * (self.dW + weight_decay * self.W)
vb = MOMENTUM * vb - lr * self.db
self.W += vW
self.b += vb
return vW, vb
class ReLU:
def __init__(self):
self._mask = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._mask = x > 0
return x * self._mask
def backward(self, dout: np.ndarray) -> np.ndarray:
return dout * self._mask
class MaxPool2D:
def __init__(self, pool: int = 2, stride: int = 2):
self.pool = pool
self.stride = stride
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
N, C, H, W = x.shape
p, s = self.pool, self.stride
assert H % p == 0 and W % p == 0, 'For simplicity we assume H and W divisible by pool size.'
out_h, out_w = H // p, W // p
x_reshaped = x.reshape(N, C, out_h, p, out_w, p)
out = x_reshaped.max(axis=(3, 5))
self._cache = (x, x_reshaped, out)
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
x, x_reshaped, out = self._cache
N, C, out_h, p, out_w, _ = x_reshaped.shape
mask = x_reshaped == out[:, :, :, None, :, None]
denom = mask.sum(axis=(3, 5), keepdims=True)
denom = np.where(denom == 0, 1, denom)
dx_reshaped = mask * (dout[:, :, :, None, :, None] / denom)
dx = dx_reshaped.reshape(x.shape)
return dx
class Flatten:
def __init__(self):
self._shape = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._shape = x.shape
return x.reshape(x.shape[0], -1)
def backward(self, dout: np.ndarray) -> np.ndarray:
return dout.reshape(self._shape)
class Linear:
def __init__(self, fan_in: int, fan_out: int, rng: np.random.Generator | None = None):
rng = rng or np.random.default_rng(0)
scale = np.sqrt(2.0 / fan_in)
self.W = (rng.standard_normal((fan_in, fan_out)).astype(np.float32) * scale)
self.b = np.zeros((fan_out,), dtype=np.float32)
self.dW = np.zeros_like(self.W)
self.db = np.zeros_like(self.b)
self._cache = None
def forward(self, x: np.ndarray) -> np.ndarray:
self._cache = x
return x @ self.W + self.b
def backward(self, dout: np.ndarray) -> np.ndarray:
x = self._cache
self.dW[...] = x.T @ dout
self.db[...] = dout.sum(axis=0)
return dout @ self.W.T
def step(self, lr: float, vW: np.ndarray, vb: np.ndarray, weight_decay: float) -> tuple[np.ndarray, np.ndarray]:
vW = MOMENTUM * vW - lr * (self.dW + weight_decay * self.W)
vb = MOMENTUM * vb - lr * self.db
self.W += vW
self.b += vb
return vW, vb
class ResidualBlock:
def __init__(self, channels: int, k: int = 3, pad: int = 1, rng: np.random.Generator | None = None):
rng = rng or np.random.default_rng(0)
self.conv1 = Conv2D(channels, channels, k=k, stride=1, pad=pad, rng=rng)
self.relu1 = ReLU()
self.conv2 = Conv2D(channels, channels, k=k, stride=1, pad=pad, rng=rng)
self.relu2 = ReLU()
def forward(self, x: np.ndarray) -> np.ndarray:
out = self.conv1.forward(x)
out = self.relu1.forward(out)
out = self.conv2.forward(out)
out = out + x # skip connection
out = self.relu2.forward(out)
return out
def backward(self, dout: np.ndarray) -> np.ndarray:
dout = self.relu2.backward(dout)
dskip = dout # gradient through identity path
dout = self.conv2.backward(dout)
dout = self.relu1.backward(dout)
dout = self.conv1.backward(dout)
dx = dout + dskip
return dx
def softmax_cross_entropy(logits: np.ndarray, y: np.ndarray) -> tuple[float, np.ndarray]:
# logits: (N, K), y: (N,)
logits = logits.astype(np.float32)
logits = logits - logits.max(axis=1, keepdims=True)
exp = np.exp(logits)
probs = exp / exp.sum(axis=1, keepdims=True)
N = logits.shape[0]
loss = -np.log(probs[np.arange(N), y] + 1e-12).mean()
dlogits = probs
dlogits[np.arange(N), y] -= 1
dlogits /= N
return float(loss), dlogits
Optional: quick gradient check (small and slow)#
If you’re writing low-level backprop code, a simple finite-difference gradient check helps catch mistakes.
We keep this off by default.
def grad_check_conv2d():
rng_local = np.random.default_rng(0)
x = rng_local.standard_normal((2, 1, 6, 6)).astype(np.float32)
conv = Conv2D(1, 2, k=3, stride=1, pad=1, rng=rng_local)
# Forward
out = conv.forward(x)
dout = rng_local.standard_normal(out.shape).astype(np.float32)
dx = conv.backward(dout)
# Numerical check for a few random weight entries
eps = 1e-3
for _ in range(10):
oc = int(rng_local.integers(0, conv.W.shape[0]))
ic = int(rng_local.integers(0, conv.W.shape[1]))
i = int(rng_local.integers(0, conv.W.shape[2]))
j = int(rng_local.integers(0, conv.W.shape[3]))
old = conv.W[oc, ic, i, j]
conv.W[oc, ic, i, j] = old + eps
out_p = conv.forward(x)
loss_p = float((out_p * dout).sum())
conv.W[oc, ic, i, j] = old - eps
out_m = conv.forward(x)
loss_m = float((out_m * dout).sum())
conv.W[oc, ic, i, j] = old
num = (loss_p - loss_m) / (2 * eps)
ana = float(conv.dW[oc, ic, i, j])
rel_err = abs(num - ana) / (abs(num) + abs(ana) + 1e-12)
print(f'W[{oc},{ic},{i},{j}] num={num:.5f} ana={ana:.5f} rel_err={rel_err:.3e}')
if RUN_GRAD_CHECK:
grad_check_conv2d()
Train the NumPy model#
Model (tiny ResNet-ish CNN):
Conv(1→8, 3×3, pad=1) + ReLU
ResidualBlock(8 channels)
MaxPool(2×2)
Flatten
Linear(8·4·4 → 10)
We track loss and accuracy each epoch.
# Build model
rng_model = np.random.default_rng(SEED)
conv1 = Conv2D(1, 8, k=3, stride=1, pad=1, rng=rng_model)
relu1 = ReLU()
res1 = ResidualBlock(8, k=3, pad=1, rng=rng_model)
pool = MaxPool2D(pool=2, stride=2)
flat = Flatten()
fc = Linear(8 * 4 * 4, 10, rng=rng_model)
# Momentum buffers
v_conv1_W = np.zeros_like(conv1.W)
v_conv1_b = np.zeros_like(conv1.b)
v_res1_c1_W = np.zeros_like(res1.conv1.W)
v_res1_c1_b = np.zeros_like(res1.conv1.b)
v_res1_c2_W = np.zeros_like(res1.conv2.W)
v_res1_c2_b = np.zeros_like(res1.conv2.b)
v_fc_W = np.zeros_like(fc.W)
v_fc_b = np.zeros_like(fc.b)
def forward_numpy(xb: np.ndarray) -> np.ndarray:
out = conv1.forward(xb)
out = relu1.forward(out)
out = res1.forward(out)
out = pool.forward(out)
out = flat.forward(out)
out = fc.forward(out)
return out
def backward_numpy(dlogits: np.ndarray) -> None:
dout = fc.backward(dlogits)
dout = flat.backward(dout)
dout = pool.backward(dout)
dout = res1.backward(dout)
dout = relu1.backward(dout)
_ = conv1.backward(dout)
def step_numpy(lr: float) -> None:
global v_conv1_W, v_conv1_b
global v_res1_c1_W, v_res1_c1_b, v_res1_c2_W, v_res1_c2_b
global v_fc_W, v_fc_b
v_conv1_W, v_conv1_b = conv1.step(lr, v_conv1_W, v_conv1_b, WEIGHT_DECAY)
v_res1_c1_W, v_res1_c1_b = res1.conv1.step(lr, v_res1_c1_W, v_res1_c1_b, WEIGHT_DECAY)
v_res1_c2_W, v_res1_c2_b = res1.conv2.step(lr, v_res1_c2_W, v_res1_c2_b, WEIGHT_DECAY)
v_fc_W, v_fc_b = fc.step(lr, v_fc_W, v_fc_b, WEIGHT_DECAY)
def predict_numpy(x: np.ndarray) -> np.ndarray:
logits = forward_numpy(x)
return logits.argmax(axis=1)
def iterate_minibatches(X_: np.ndarray, y_: np.ndarray, batch_size: int, rng_: np.random.Generator):
idx = rng_.permutation(len(X_))
for start in range(0, len(X_), batch_size):
sl = idx[start:start + batch_size]
yield X_[sl], y_[sl]
history_numpy = []
start = time.time()
for epoch in range(1, EPOCHS_NUMPY + 1):
# Train
losses = []
for xb, yb in iterate_minibatches(X_train, y_train, BATCH_SIZE, rng):
logits = forward_numpy(xb)
loss, dlogits = softmax_cross_entropy(logits, yb)
losses.append(loss)
backward_numpy(dlogits)
step_numpy(LR_NUMPY)
# Eval
yhat_train = predict_numpy(X_train)
yhat_test = predict_numpy(X_test)
train_acc = accuracy_score(y_train, yhat_train)
test_acc = accuracy_score(y_test, yhat_test)
history_numpy.append({
'epoch': epoch,
'loss': float(np.mean(losses)),
'train_acc': float(train_acc),
'test_acc': float(test_acc),
})
print(f"[NumPy] epoch {epoch:02d}/{EPOCHS_NUMPY} loss={history_numpy[-1]['loss']:.4f} train_acc={train_acc:.3f} test_acc={test_acc:.3f}")
elapsed = time.time() - start
print(f"NumPy training time: {elapsed:.2f}s")
[NumPy] epoch 01/10 loss=1.6529 train_acc=0.678 test_acc=0.660
[NumPy] epoch 02/10 loss=0.3443 train_acc=0.941 test_acc=0.953
[NumPy] epoch 03/10 loss=0.1182 train_acc=0.975 test_acc=0.964
[NumPy] epoch 04/10 loss=0.1146 train_acc=0.974 test_acc=0.956
[NumPy] epoch 05/10 loss=0.0602 train_acc=0.988 test_acc=0.969
[NumPy] epoch 06/10 loss=0.0358 train_acc=0.990 test_acc=0.971
[NumPy] epoch 07/10 loss=0.0244 train_acc=0.993 test_acc=0.978
[NumPy] epoch 08/10 loss=0.0231 train_acc=0.996 test_acc=0.973
[NumPy] epoch 09/10 loss=0.0153 train_acc=1.000 test_acc=0.978
[NumPy] epoch 10/10 loss=0.0050 train_acc=1.000 test_acc=0.976
NumPy training time: 2.58s
dfn = pd.DataFrame(history_numpy)
fig = go.Figure()
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['loss'], mode='lines+markers', name='loss'))
fig.update_layout(title='NumPy training loss', xaxis_title='epoch', yaxis_title='cross-entropy')
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['train_acc'], mode='lines+markers', name='train'))
fig.add_trace(go.Scatter(x=dfn['epoch'], y=dfn['test_acc'], mode='lines+markers', name='test'))
fig.update_layout(title='NumPy accuracy', xaxis_title='epoch', yaxis_title='accuracy', yaxis=dict(range=[0,1]))
fig.show()
5) Visual diagnostics (NumPy)#
A few quick sanity checks that often help with CNNs:
learned filters in the first conv layer
feature maps (activations) for a single image
confusion matrix and misclassifications
# First-layer filters: (8, 1, 3, 3)
W1 = conv1.W[:, 0] # (8, 3, 3)
fig = px.imshow(
W1,
facet_col=0,
facet_col_wrap=4,
color_continuous_scale='RdBu',
zmin=float(W1.min()),
zmax=float(W1.max()),
title='NumPy: learned Conv1 filters (3×3)',
)
for a in fig.layout.annotations:
a.text = ''
fig.update_layout(coloraxis_showscale=False)
fig.show()
# Feature maps after the first conv+relu (before residual block)
x0 = X_test[0:1]
out1 = relu1.forward(conv1.forward(x0)) # (1, 8, 8, 8)
fig = px.imshow(
out1[0],
facet_col=0,
facet_col_wrap=4,
color_continuous_scale='Viridis',
title='NumPy: feature maps after Conv1 + ReLU (one test image)',
)
for a in fig.layout.annotations:
a.text = ''
fig.update_layout(coloraxis_showscale=False)
fig.show()
yhat = predict_numpy(X_test)
cm = confusion_matrix(y_test, yhat, labels=list(range(10)))
fig = px.imshow(
cm,
text_auto=True,
color_continuous_scale='Blues',
title='NumPy: confusion matrix (test set)',
labels=dict(x='pred', y='true', color='count'),
)
fig.update_xaxes(tickmode='array', tickvals=list(range(10)))
fig.update_yaxes(tickmode='array', tickvals=list(range(10)))
fig.show()
# A quick look at some NumPy misclassifications
yhat = predict_numpy(X_test)
mis = np.where(yhat != y_test)[0]
n_show = min(36, len(mis))
mis = mis[:n_show]
imgs = X_test[mis, 0]
texts = [f"true={int(y_test[i])}, pred={int(yhat[i])}" for i in mis]
fig = px.imshow(
imgs,
facet_col=0,
facet_col_wrap=9,
color_continuous_scale='gray',
title=f'NumPy: misclassified test images (first {n_show})',
)
for a, t in zip(fig.layout.annotations, texts):
a.text = t
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)
fig.update_layout(coloraxis_showscale=False, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
6) Practical implementation in PyTorch#
Now we implement the same idea in PyTorch:
define a small residual CNN using
nn.Moduletrain with Adam
visualize curves and a few predictions
Compared to NumPy, PyTorch gives you:
automatic differentiation (autograd)
optimized kernels
higher-level building blocks (Conv2d, BatchNorm, etc.)
if not TORCH_AVAILABLE:
raise RuntimeError(f"PyTorch not available: {_TORCH_IMPORT_ERROR}")
# Device (suppress noisy CUDA init warnings in restricted environments)
import warnings
with warnings.catch_warnings():
warnings.filterwarnings('ignore', message='CUDA initialization:.*')
cuda_ok = bool(torch.cuda.is_available())
device = torch.device('cuda' if cuda_ok else 'cpu')
print('device:', device)
# Tensors
Xtr = torch.from_numpy(X_train).to(device)
ytr = torch.from_numpy(y_train).to(device)
Xte = torch.from_numpy(X_test).to(device)
yte = torch.from_numpy(y_test).to(device)
train_loader = DataLoader(TensorDataset(Xtr, ytr), batch_size=BATCH_SIZE, shuffle=True)
train_eval_loader = DataLoader(TensorDataset(Xtr, ytr), batch_size=256, shuffle=False)
test_loader = DataLoader(TensorDataset(Xte, yte), batch_size=256, shuffle=False)
class ResBlock(nn.Module):
def __init__(self, ch: int):
super().__init__()
self.conv1 = nn.Conv2d(ch, ch, kernel_size=3, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(ch)
self.conv2 = nn.Conv2d(ch, ch, kernel_size=3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(ch)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out = F.relu(out + x)
return out
class TinyResNet(nn.Module):
def __init__(self):
super().__init__()
self.stem = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=3, padding=1, bias=False),
nn.BatchNorm2d(16),
nn.ReLU(),
)
self.block1 = ResBlock(16)
self.pool = nn.MaxPool2d(kernel_size=2)
self.head = nn.Linear(16 * 4 * 4, 10)
def forward(self, x):
x = self.stem(x)
x = self.block1(x)
x = self.pool(x)
x = x.flatten(1)
return self.head(x)
torch.manual_seed(SEED)
model = TinyResNet().to(device)
opt = torch.optim.Adam(model.parameters(), lr=LR_TORCH, weight_decay=WEIGHT_DECAY)
def eval_loader(loader):
model.eval()
ys = []
yhats = []
losses = []
with torch.no_grad():
for xb, yb in loader:
logits = model(xb)
loss = F.cross_entropy(logits, yb)
losses.append(loss.item())
ys.append(yb.detach().cpu().numpy())
yhats.append(logits.argmax(dim=1).detach().cpu().numpy())
y_all = np.concatenate(ys)
yhat_all = np.concatenate(yhats)
return float(np.mean(losses)), float(accuracy_score(y_all, yhat_all))
history_torch = []
start = time.time()
for epoch in range(1, EPOCHS_TORCH + 1):
model.train()
batch_losses = []
for xb, yb in train_loader:
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
batch_losses.append(loss.item())
train_loss, train_acc = eval_loader(train_eval_loader)
test_loss, test_acc = eval_loader(test_loader)
history_torch.append({
'epoch': epoch,
'loss': float(np.mean(batch_losses)),
'train_acc': train_acc,
'test_acc': test_acc,
'train_loss_eval': train_loss,
'test_loss_eval': test_loss,
})
print(f"[Torch] epoch {epoch:02d}/{EPOCHS_TORCH} loss={history_torch[-1]['loss']:.4f} train_acc={train_acc:.3f} test_acc={test_acc:.3f}")
elapsed = time.time() - start
print(f"Torch training time: {elapsed:.2f}s")
device: cpu
[Torch] epoch 01/10 loss=1.7513 train_acc=0.731 test_acc=0.693
[Torch] epoch 02/10 loss=0.7211 train_acc=0.938 test_acc=0.920
[Torch] epoch 03/10 loss=0.3630 train_acc=0.970 test_acc=0.960
[Torch] epoch 04/10 loss=0.2235 train_acc=0.976 test_acc=0.967
[Torch] epoch 05/10 loss=0.1704 train_acc=0.987 test_acc=0.982
[Torch] epoch 06/10 loss=0.1335 train_acc=0.990 test_acc=0.982
[Torch] epoch 07/10 loss=0.0961 train_acc=0.993 test_acc=0.984
[Torch] epoch 08/10 loss=0.0769 train_acc=0.997 test_acc=0.982
[Torch] epoch 09/10 loss=0.0609 train_acc=0.996 test_acc=0.987
[Torch] epoch 10/10 loss=0.0495 train_acc=0.998 test_acc=0.989
Torch training time: 1.07s
dft = pd.DataFrame(history_torch)
fig = go.Figure()
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['loss'], mode='lines+markers', name='train_loss(batch)'))
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['test_loss_eval'], mode='lines+markers', name='test_loss'))
fig.update_layout(title='Torch loss', xaxis_title='epoch', yaxis_title='cross-entropy')
fig.show()
fig = go.Figure()
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['train_acc'], mode='lines+markers', name='train'))
fig.add_trace(go.Scatter(x=dft['epoch'], y=dft['test_acc'], mode='lines+markers', name='test'))
fig.update_layout(title='Torch accuracy', xaxis_title='epoch', yaxis_title='accuracy', yaxis=dict(range=[0,1]))
fig.show()
model.eval()
with torch.no_grad():
logits = model(Xte)
yhat = logits.argmax(dim=1).detach().cpu().numpy()
cm = confusion_matrix(y_test, yhat, labels=list(range(10)))
fig = px.imshow(
cm,
text_auto=True,
color_continuous_scale='Blues',
title='Torch: confusion matrix (test set)',
labels=dict(x='pred', y='true', color='count'),
)
fig.update_xaxes(tickmode='array', tickvals=list(range(10)))
fig.update_yaxes(tickmode='array', tickvals=list(range(10)))
fig.show()
# A quick look at some Torch misclassifications
# yhat is produced in the confusion-matrix cell above
mis = np.where(yhat != y_test)[0]
n_show = min(36, len(mis))
mis = mis[:n_show]
imgs = X_test[mis, 0]
texts = [f"true={int(y_test[i])}, pred={int(yhat[i])}" for i in mis]
fig = px.imshow(
imgs,
facet_col=0,
facet_col_wrap=9,
color_continuous_scale='gray',
title=f'Torch: misclassified test images (first {n_show})',
)
for a, t in zip(fig.layout.annotations, texts):
a.text = t
fig.update_xaxes(showticklabels=False).update_yaxes(showticklabels=False)
fig.update_layout(coloraxis_showscale=False, margin=dict(l=10, r=10, t=60, b=10))
fig.show()
7) NumPy vs PyTorch (what to take away)#
NumPy from scratch forces you to understand tensor shapes, gradients, and the mechanics of convolution.
PyTorch lets you scale up: larger models, bigger datasets, GPUs, and a huge ecosystem.
On real vision tasks you almost always prototype/iterate in PyTorch (or similar), but knowing what happens under the hood makes you better at debugging and designing models.
Pitfalls + diagnostics#
If training is unstable: lower the learning rate, check initialization, and verify your loss/gradients.
If accuracy saturates early: try more channels, add another residual block, or train longer.
If NumPy training is very slow: reduce epochs, reduce channels, or use fewer samples.
Exercises#
Add a second residual block and compare curves.
Replace MaxPool with a strided convolution.
Add dropout in the PyTorch head.
(Advanced) Implement BatchNorm in NumPy.
References#
He et al., 2015: Deep Residual Learning for Image Recognition
CS231n notes on CNNs